Initial Data Transformation¶
Library imports¶
Use pip or conda to install the required packages/libraries before running the notebook.
#!pip install -r requirements.txt
import requests
from pathlib import Path
from datetime import datetime
from IPython.display import Markdown
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import h3
import h3pandas # More specialised H3 functions, specifically for use with Pandas
# Pandas alternatives (just for speed comparisons)
import vaex
import dask
#import polars
#import cudf
# Imports for interactive map visualisation.
# (Must mark this notebook as Trusted to see it.)
import folium
from folium.plugins import HeatMap, HeatMapWithTime
#import pydeck
Display settings¶
Set the notebook display option to show all columns in full.
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
Data folders¶
This checks if the data files for the challenge are present, otherwise it downloads them from the S3 bucket.
Original files are stored in the data folder, while processed output will be written to the output folder.
data_url = 'https://cct-ds-code-challenge-input-data.s3.af-south-1.amazonaws.com/'
data_folder = Path('./data/')
output_folder = Path('./output/')
data_files = [
'sr.csv.gz',
'sr_hex.csv.gz',
#'sr_hex_truncated.csv',
'city-hex-polygons-8.geojson',
#'city-hex-polygons-8-10.geojson',
]
# Create `data` and `output` subfolders if they don't exist yet
data_folder.mkdir(parents=False, exist_ok=True)
output_folder.mkdir(parents=False, exist_ok=True)
for data_file in data_files:
file_path = data_folder / data_file
if not file_path.exists():
display(Markdown(f'Downloading *{data_file}*...'))
file_url = data_url + data_file
response = requests.get(file_url)
response.raise_for_status()
with open(file_path, 'wb') as f:
f.write(response.content)
else:
display(Markdown(f'*{data_file}* already exists.'))
display(Markdown('**Done.**'))
sr.csv.gz already exists.
sr_hex.csv.gz already exists.
city-hex-polygons-8.geojson already exists.
Done.
Schema inspection¶
The first part of the task involves joining the features from city-hex-polygons-8.geojson to the service request entries in sr_hex.csv.gz.
Let's see what the schema looks like by just showing 1 item from each...
pd.read_csv(data_folder / 'sr.csv.gz', nrows=1, index_col=0)
| notification_number | reference_number | creation_timestamp | completion_timestamp | directorate | department | branch | section | code_group | code | cause_code_group | cause_code | official_suburb | latitude | longitude | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 400583534 | 9109491785 | 2020-10-07 06:55:18+02:00 | 2020-10-08 15:36:35+02:00 | URBAN MOBILITY | Roads Infrastructure Management | RIM Area Central | District: Blaauwberg | TD Customer complaint groups | Pothole&Defect Road Foot Bic Way/Kerbs | Road (RCL) | Wear and tear | MONTAGUE GARDENS | -33.872839 | 18.522488 |
gpd.read_file(data_folder / 'city-hex-polygons-8.geojson', index_col='index').head(1)
| index | centroid_lat | centroid_lon | geometry | |
|---|---|---|---|---|
| 0 | 88ad361801fffff | -33.859427 | 18.677843 | POLYGON ((18.68119 -33.86330, 18.68357 -33.85928, 18.68023 -33.85541, 18.67450 -33.85555, 18.67211 -33.85957, 18.67546 -33.86345, 18.68119 -33.86330)) |
Calculate the hexagon index¶
There appears to be no common key column that we can join on. We can calculate the H3 hexagon index from the latitude and longitude of each service request. That can then be matched up with the index of the geojson feature.
(We could also have done a point-in-polygon check on the polygon geometry, but that would have been much less efficient. Doing a search for the nearest centroid would be similarly inefficient, while also potentially inaccurate for points in the outer corners of the hexagon.)
%%time
# Load the CSV file into a Pandas DataFrame
df = pd.read_csv(data_folder / 'sr.csv.gz', index_col=0)
# Replace NaN values in the latitude and longitude columns with 0
# NOTE: This is optional - h3pandas can work with NaNs.
#df[['latitude', 'longitude']] = df[['latitude', 'longitude']].fillna(value=0)
# Create a h3pandas DataFrame, calculating the h3 id from the latitude and longitude
h3df = df[['latitude', 'longitude']].h3.geo_to_h3(resolution=8, lat_col='latitude', lng_col='longitude')
# Append the h3 id column to the original DataFrame
df['h3_level8_index'] = h3df.index.to_frame(index=False)
# Set the h3 id to '0' if coordinates are missing
# NOTE: This is optional - h3pandas already does it internally.
#df.loc[df['latitude'].isnull() | df['longitude'].isnull(), 'h3_level8_index'] = 0
# Keep for later (validation)
df_pandas = df.copy()
CPU times: total: 2.53 s Wall time: 2.53 s
Validation¶
Check to make sure our results match the reference CSV file.
# Check if the Pandas DataFrame contains the same values as the provided reference file
df_csv = pd.read_csv(data_folder / 'sr_hex.csv.gz', index_col=None)
matched = df_pandas.equals(df_csv)
display(Markdown(f'Pandas == sr_hex.csv: **{matched}**'))
Pandas == sr_hex.csv: True
Join on hexagon index¶
Now that we have a common key, we can merge the two DataFrames together.
%%time
# Read CSV file into Vaex dataframe
df_sr = vaex.from_csv(data_folder / 'sr_hex.csv.gz', index_col=None)
#print(len(df_sr)) # 941634
# Read GeoJSON file into GeoPandas dataframe
df_geo = gpd.read_file(data_folder / 'city-hex-polygons-8.geojson', index_col='index')
#print(len(df_geo)) # 3832
# Rename key column to match the other dataframe
df_sr.rename('h3_level8_index', 'index')
# Convert from GeoPandas to Vaex dataframe, depending on what kind of dataframe we want after merging
df_geo = vaex.from_pandas(df_geo)
#print(type(df_geo)) # <class 'vaex.dataframe.DataFrameLocal'>
# Merge the dataframes on the common index column
# NOTE: Vaex does not support outer join yet,
# so we do a left join, which keeps all the service requests,
# but discards any unmatched (empty) hexagon features.
merged = df_sr.join(df_geo, how='left', on='index', allow_duplication=False)
#print(len(merged)) # 941634 (same as service request count)
print(f'Column names after merge: { merged.column_names }\n')
#merged.head(5)
Column names after merge: ['notification_number', 'reference_number', 'creation_timestamp', 'completion_timestamp', 'directorate', 'department', 'branch', 'section', 'code_group', 'code', 'cause_code_group', 'cause_code', 'official_suburb', 'latitude', 'longitude', 'index', 'centroid_lat', 'centroid_lon', 'geometry'] CPU times: total: 2.64 s Wall time: 2.63 s
Find unmatched entries¶
Even without merging, a comparison of the hex id's on both sides will indicate if there are any rows that won't match.
Check for service requests with hex id's that don't exist in the provided GeoJSON.
Check for hex id's in the GeoJSON that have no associated service requests.
# Count number of service requests without coordinates
num_zero = len(df_sr[df_sr['index']=='0'])
# Get list of h3 index strings from both dataframes
sr_list = df_sr['index'].tolist()
geo_list = df_geo['index'].tolist()
# Convert to list to remove duplicates
sr_set = set(sr_list)
geo_set = set(geo_list)
# Only keep non-zero h3 id's
sr_set.discard('0')
# Get the difference between the sets
sr_unmatched = sr_set - geo_set
geo_unmatched = geo_set - sr_set
# Display some stats
display(Markdown(f'## Statistics'))
display(Markdown(' '))
display(Markdown(f'Total service requests: **{ len(sr_list) }**'))
display(Markdown(f'- Without coordinates: **{ num_zero }**'))
display(Markdown(f'- Inside unlisted hexagons: **{ len(sr_unmatched) }**'))
display(Markdown(f'`{ list(sr_unmatched) }`'))
display(Markdown(' '))
display(Markdown(f'Total listed hexagons: **{ len(geo_set) }**'))
display(Markdown(f'- With service requests: **{ len(sr_set) }**'))
display(Markdown(f'- Without service requests: **{ len(geo_unmatched) }**'))
display(Markdown(' '))
Statistics¶
Â
Total service requests: 941634
- Without coordinates: 212364
- Inside unlisted hexagons: 2
['88ad36c629fffff', '88ad361b51fffff']
Â
Total listed hexagons: 3832
- With service requests: 2082
- Without service requests: 1752
Â
Error threshold¶
It is not entirely clear what the intent is with this part of the challenge instructions:
Include logging that lets the executor know how many of the records failed to join, and include a join error threshold above which the script will error out. Please motivate why you have selected the error threshold that you have.
Maybe it's for SQL solutions? Even a 100% failure rate doesn't necessarily mean there is a problem with the script. People might just be leaving out the coordinates when creating service requests, which would lead to an unjoined row that will not be shown on the map. Instead of triggering errors, these incomplete entries could instead be flagged for clean-up before even starting the merge.
Distributed computing¶
Vaex already makes it possible to process the data in chunks, or across multiple processes on a single computer. But if the idea is to split the work across multiple machines on a network, we can use a Dask cluster. The syntax is similar to Pandas and Vaex. It can even utilise the GPU (with RAPIDS cuDF).
For now, we stick with just Vaex, though.
Grouping / Aggregation¶
Show the total number of service requests per:
- Suburb (
official_suburb) - Directorate (
directorate) - Month (new column)
- hexagon (
index)
# Add calculated MMM YYYY column
# Define a function to extract month name and year from timestamp string
def get_month_name(timestamp_str):
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S%z')
return timestamp.strftime('%b %Y')
# Create a new column called 'month_name' containing the short month name and year format
merged['month_name'] = merged['creation_timestamp'].apply(get_month_name)
# List of months
print(set(merged['month_name'].tolist()))
# List of directorates
print(set(merged['directorate'].tolist()))
{'Jun 2020', 'Dec 2020', 'Jul 2020', 'May 2020', 'Feb 2020', 'Oct 2020', 'Aug 2020', 'Sep 2020', 'Apr 2020', 'Jan 2020', 'Mar 2020', 'Nov 2020'}
{'HUMAN SETTLEMENTS', 'SAFETY AND SECURITY', 'SPATIAL PLANNING AND ENVIRONMENT', 'WATER AND SANITATION', 'ENERGY', 'URBAN MOBILITY', 'COMMUNITY SERVICES AND HEALTH', 'OFFICE OF THE CITY MANAGER', 'ECONOMIC GROWTH', 'URBAN WASTE MANAGEMENT', 'FINANCE', 'CORPORATE SERVICES', None}
%%time
# Service requests by suburb for specific directorate and month
df_only_water = merged[merged['directorate'] == 'WATER AND SANITATION']
df_only_recent = df_only_water[df_only_water['month_name'] == 'Dec 2020']
agg = df_only_recent.groupby(['official_suburb']).agg({'per_suburb': 'count'})
agg = agg.sort('per_suburb', ascending=False)
agg.head(10)
CPU times: total: 625 ms Wall time: 1.99 s
| # | official_suburb | per_suburb |
|---|---|---|
| 0 | -- | 4140 |
| 1 | PHILIPPI | 1131 |
| 2 | GUGULETU | 602 |
| 3 | STRAND | 492 |
| 4 | MANENBERG | 487 |
| 5 | BONTEHEUWEL | 483 |
| 6 | HOUT BAY | 483 |
| 7 | LOTUS RIVER | 433 |
| 8 | TAFELSIG | 427 |
| 9 | EERSTERIVIER | 388 |
Preliminary visualisations¶
Some plots and charts to get a sense of the data before building a proper dashboard.
First prepare all the hexagon shapes, then all the individual points. Then plot it all on a Folium map.
df = merged
#df = df_only_water
# Use only rows with valid hexagon id
df = df[df['index']!='0']
# Convert from Vaex to Pandas dataframe
df = df.to_pandas_df()
# Convert from Pandas to GeoPandas dataframe
gdf = gpd.GeoDataFrame(df, geometry='geometry')
# Group by h3 hexagon and add a column for number of entries per hexagon
grouped_gdf = gdf.groupby(['index']).size().reset_index(name='h3_sr_count')
# Merge columns we want to plot
gdf_with_count = gdf[['index', 'geometry']].drop_duplicates(subset=['index']).merge(grouped_gdf)
# Concatenate with unmatched
gdf_unmatched = gpd.GeoDataFrame(df_geo[df_geo['index'].isin(geo_unmatched)][['index', 'geometry']].to_pandas_df(), geometry='geometry')
gdf_unmatched['h3_sr_count'] = 0 # Add a h3_sr_count column of 0 for all
gdf_final = pd.concat([gdf_with_count, gdf_unmatched], ignore_index=True)
gdf_final
| index | geometry | h3_sr_count | |
|---|---|---|---|
| 0 | 88ad360225fffff | POLYGON ((18.52747 -33.87500, 18.52985 -33.87098, 18.52651 -33.86711, 18.52078 -33.86724, 18.51840 -33.87126, 18.52174 -33.87514, 18.52747 -33.87500)) | 118 |
| 1 | 88ad36d5e1fffff | POLYGON ((18.85504 -34.08429, 18.85743 -34.08028, 18.85407 -34.07641, 18.84833 -34.07656, 18.84594 -34.08057, 18.84930 -34.08444, 18.85504 -34.08429)) | 813 |
| 2 | 88ad36d437fffff | POLYGON ((18.82350 -34.10880, 18.82589 -34.10479, 18.82253 -34.10092, 18.81679 -34.10107, 18.81440 -34.10508, 18.81776 -34.10894, 18.82350 -34.10880)) | 712 |
| 3 | 88ad361133fffff | POLYGON ((18.61103 -33.92441, 18.61341 -33.92040, 18.61007 -33.91652, 18.60434 -33.91666, 18.60195 -33.92068, 18.60530 -33.92455, 18.61103 -33.92441)) | 1995 |
| 4 | 88ad361709fffff | POLYGON ((18.45491 -33.98741, 18.45730 -33.98341, 18.45396 -33.97953, 18.44823 -33.97966, 18.44584 -33.98367, 18.44918 -33.98754, 18.45491 -33.98741)) | 848 |
| ... | ... | ... | ... |
| 3829 | 88ad369711fffff | POLYGON ((18.47441 -34.36137, 18.47681 -34.35739, 18.47346 -34.35353, 18.46772 -34.35366, 18.46533 -34.35764, 18.46867 -34.36150, 18.47441 -34.36137)) | 0 |
| 3830 | 88ad369715fffff | POLYGON ((18.48255 -34.35726, 18.48494 -34.35328, 18.48159 -34.34942, 18.47585 -34.34955, 18.47346 -34.35353, 18.47681 -34.35739, 18.48255 -34.35726)) | 0 |
| 3831 | 88ad369717fffff | POLYGON ((18.47346 -34.35353, 18.47585 -34.34955, 18.47250 -34.34569, 18.46676 -34.34581, 18.46437 -34.34980, 18.46772 -34.35366, 18.47346 -34.35353)) | 0 |
| 3832 | 88ad369733fffff | POLYGON ((18.48063 -34.34158, 18.48303 -34.33759, 18.47968 -34.33373, 18.47394 -34.33386, 18.47155 -34.33784, 18.47490 -34.34170, 18.48063 -34.34158)) | 0 |
| 3833 | 88ad369739fffff | POLYGON ((18.49068 -34.35315, 18.49307 -34.34917, 18.48972 -34.34531, 18.48398 -34.34543, 18.48159 -34.34942, 18.48494 -34.35328, 18.49068 -34.35315)) | 0 |
3834 rows × 3 columns
# Get only relevant service request columns
datapoints = df_sr[['notification_number', 'cause_code', 'latitude', 'longitude']].to_pandas_df()
# Only keep points with coordinates
datapoints = datapoints.dropna(subset=['latitude', 'longitude'])
# Replace latitude and longitude with a Point geometry
datapoints['geometry'] = datapoints.apply(lambda row: Point(row['longitude'], row['latitude']), axis=1)
# Convert GeoPandas dataframe
datapoints = gpd.GeoDataFrame(datapoints, geometry='geometry')
# Set CRS (Coordinate Reference System) to WGS 84 datum
datapoints = datapoints.set_crs(epsg=4326)
datapoints
| notification_number | cause_code | latitude | longitude | geometry | |
|---|---|---|---|---|---|
| 0 | 400583534 | Wear and tear | -33.872839 | 18.522488 | POINT (18.52249 -33.87284) |
| 1 | 400555043 | Vandalism | -34.078916 | 18.848940 | POINT (18.84894 -34.07892) |
| 2 | 400589145 | Vandalism | -34.102242 | 18.821116 | POINT (18.82112 -34.10224) |
| 3 | 400538915 | Wear and tear | -33.920019 | 18.607209 | POINT (18.60721 -33.92002) |
| 4 | 400568554 | Surfacing failure | -33.987400 | 18.453760 | POINT (18.45376 -33.98740) |
| ... | ... | ... | ... | ... | ... |
| 941627 | 1016508423 | None | -33.922607 | 18.498545 | POINT (18.49854 -33.92261) |
| 941629 | 1016508425 | None | -33.931571 | 18.452159 | POINT (18.45216 -33.93157) |
| 941630 | 1016508432 | Foreign Objects | -33.783246 | 18.716554 | POINT (18.71655 -33.78325) |
| 941632 | 1016508442 | None | -33.971099 | 18.659831 | POINT (18.65983 -33.97110) |
| 941633 | 1016508443 | None | -34.088078 | 18.862550 | POINT (18.86255 -34.08808) |
729270 rows × 5 columns
# Calculate centerpoint before setting CRS
mean_latlon = [gdf_final.geometry.centroid.y.mean(), gdf_final.geometry.centroid.x.mean()]
# Set CRS (Coordinate Reference System) to WGS 84 datum
gdf_crs = gdf_final.set_crs(epsg=4326)
# Normalise count values (0..1)
#gdf_crs['normalised_count'] = gdf_crs['h3_sr_count']/gdf_crs['h3_sr_count'].max()
# Create a map object
m = folium.Map(location=mean_latlon, zoom_start=10, tiles='cartodbpositron')
# Add choropleth layer(s)
choropleth0 = folium.Choropleth(
geo_data=gdf_crs[gdf_crs['h3_sr_count']==0],
name='choropleth0',
data=gdf_crs[gdf_crs['h3_sr_count']==0],
columns=['index', 'h3_sr_count'],
key_on='feature.properties.index',
fill_color='Greys', # ColorBrewer code
fill_opacity=0.5,
line_opacity=0.5,
legend_name='No Service Requests'
)
choropleth1 = folium.Choropleth(
geo_data=gdf_crs[gdf_crs['h3_sr_count']>0],
name='choropleth1',
data=gdf_crs[gdf_crs['h3_sr_count']>0],
columns=['index', 'h3_sr_count'],
key_on='feature.properties.index',
fill_color='YlOrRd', # ColorBrewer code
fill_opacity=0.5,
line_opacity=0.5,
legend_name='Service Request Count'
)
# Add the created layers to the map
choropleth0.add_to(m)
choropleth1.add_to(m)
# Create a marker per individual service request
#markers = folium.GeoJson(datapoints.to_json(), name='Points', tooltip=folium.features.GeoJsonTooltip(fields=['cause_code'])) # Too slow...
#markers.add_to(m)
# Add a heatmap layer instead of individual points
HeatMap(datapoints[['latitude', 'longitude']],
radius=3,
blur=5,
min_opacity=1,
gradient={0: 'blue', 0.25: 'cyan', 0.5: 'yellow', 0.75: 'orange', 1: 'red'},
overlay=True,
control=True,
show=True,
name='heatmap',
legend_name='Service Request Heatmap'
).add_to(m)
# Add additional style/theme options.
#folium.TileLayer('OpenStreetMap', control=True).add_to(m)
#folium.TileLayer('Stamen Toner').add_to(m)
#folium.TileLayer('Stamen Terrain').add_to(m)
#folium.TileLayer('Stamen Water Color').add_to(m)
#folium.TileLayer('cartodbpositron').add_to(m)
#folium.TileLayer('cartodbdark_matter').add_to(m)
# Make it toggleable
folium.LayerControl().add_to(m)
m